Multiprocess Learning (Ape-X)

From version 9.4.2, new classes MPReplayBuffer and MPPrioritizedReplayBuffer support multiprocess learning like Ape-X (single learner with multiple explorers) on single machine efficiently.

1 Shared Memory

First of all, MPReplayBuffer and MPPrioritizedReplayBuffer (Multi Process ReplayBuffer) maps internal data on shared memory. This means you don’t need to use proxy (e.g. multiprocessing.managers.SyncManager) or queue (e.g. multiprocessing.Queue) for interproecss data sharing, but you can simply access the buffer object from different process.

from multiprocessing import Process
from cpprb import MPPrioritizedReplayBuffer

rb = MPPrioritizedReplayBuffer(100,{"obs": {},"done" {}})

def explorer(rb):
    for _ in range(100):
        # Something ...
        rb.add(obs=obs, done=done)

p = Process(target=explorer,args=[rb]) # You can pass to Process simply as argument
p.start()
p.join()

sample = p.sample(10) # You can access data stored at different process.

2 Efficient Lock

Although you can implement Ape-X with ordinary ReplayBuffer or PrioritizedReplayBuffer class, locking entire buffer when writing and reading is quite inefficient.

# Part of Explorer Naiive Implementation
if local_buffer.get_stored_size() > local_size:
    local_sample = local_buffer.get_all_transitions()
    local_buffer.clear()

    with lock: # Inefficient: Lock entire buffer during addition
        global_buffer.add(**local_sample)

MPReplayBuffer and MPPrioritizedReplayBuffer automatically lock only critical section instead of entire buffer. For example, since sequential add method calls should write different memory address, its critical section is only index fetching and increment. This locking reduction allows multiple explorers to add transitions parallelly.1

We adopt exclusive-read concurrent-write model for access control. We allow multiple writing parallelly and atomically trace the number of writers in the critical section. Reading has higher priority and prevents writers (aka. actors) from entering the critical section again. When all writers exit the critical section, reader (aka. learner) starts working in the critical section.

We restrict the number of learner to 1. If we allow multiple learners, which have higher priorities, it is possible that actors will never enter the critical section, which is not desired for reinforcement learning.

3 Limitation

MPReplayBuffer and MPPrioritizedReplayBuffer don’t support features of Nstep Experience Replay, Memory Compression, and Map Data on File. (You can still utilize these features at local buffers of explorers.)

MPReplayBuffer and MPPrioritizedReplayBuffer assume single learner (sample / update_priorities) and multiple explorers (add). You must not call learner functions from multiple processes simultaneously.

4 Context and Backend

From version 10.6, MPReplayBuffer and MPPrioritizedReplayBuffer accept two new keyword arguments at their constructors.

Name Default Type Description
ctx None Optional[Union[ForkContext,SpawnContext,SyncManager]] Context to be used for Event, Lock etc. If None (default), default context is used. The context passed must match with the context of Process. When SyncManager is passed, Event and Lock are accessed through proxy.
backend "sharedctypes" "sharedctypes" or "SharedMemory" (only for Python 3.8+) Backend for shared memory.

4.1 Context Detail

Context specifies how cpprb manages shared data and synchronization.

import multiprocessing as mp

default_context = mp.get_context()
fork_context = mp.get_context("fork") # only at Linux and macOS
spawn_context = mp.get_context("spawn")

On Linux, fork is the default context, where the parent process itself is reused with copy-on-write strategy, so that usually subprocess starts faster, but it might be problematic if some library like TensorFlow has already started background thread.

On macOS and Linux, spawn is the default context, where only necessary objects are copied to a fresh new process. The disadvantage is not only slow start but also requiring users to define classes and functions at top-level of module (See pickle).

Precisely speaking, SyncManager is not a start method like fork and spawn. Actually, it starts new process with one of these methods, then provides proxy objects to access the original objects placed at the process. Generally, SyncManager is slower than others because it requires interprocess communication. If you specifies SyncManager, only synchronization objects (Lock and Event) become proxy based, and internal main data are still placed on shared memory.

4.2 Backend Detail

SharedMemory was introduced at Python 3.8 (Ref). The backend can be serialized, so that it can work with pickle based multiprocessing like Ray. (See example)

On Linux, SharedMemory are mapped under /dev/shm directory by shm_open. If the directory doesn’t have enough space, the program dies with Bus error, which (as far as we know) cannot be handled by Python program correctly (like segmentation fault). This is often the case inside docker container. You can increase the size by docker run -it --shm-size 1G python3.9 bash etc.

On the other hand, sharedctypes cannot be serialized. It can be passed to its subprocess only with process creation by fork and spawn (e.g. Process).

The advantage of sharedctypes is that it has fallback mechanism which can create shared memory even when /dev/shm directory doesn’t have enough space.

5 Example Code

from multiprocessing import Process, Event, SimpleQueue
import time

import gym
import numpy as np
from tqdm import tqdm

from cpprb import ReplayBuffer, MPPrioritizedReplayBuffer


class MyModel:
    def __init__(self):
        self._weights = 0

    def get_action(self,obs):
        # Implement action selection
        return 0

    def abs_TD_error(self,sample):
        # Implement absolute TD error
        return np.zeros(sample["obs"].shape[0])

    @property
    def weights(self):
        return self._weights

    @weights.setter
    def weights(self,w):
        self._weights = w

    def train(self,sample):
        # Implement model update
        pass


def explorer(global_rb,env_dict,is_training_done,queue):
    local_buffer_size = int(1e+2)
    local_rb = ReplayBuffer(local_buffer_size,env_dict)

    model = MyModel()
    env = gym.make("CartPole-v1")

    obs = env.reset()
    while not is_training_done.is_set():
        if not queue.empty():
            w = queue.get()
            model.weights = w

        action = model.get_action(obs)
        next_obs, reward, done, _ = env.step(action)
        local_rb.add(obs=obs,act=action,rew=reward,next_obs=next_obs,done=done)

        if done:
            local_rb.on_episode_end()
            obs = env.reset()
        else:
            obs = next_obs

        if local_rb.get_stored_size() == local_buffer_size:
            local_sample = local_rb.get_all_transitions()
            local_rb.clear()

            absTD = model.abs_TD_error(local_sample)
            global_rb.add(**local_sample,priorities=absTD)


def learner(global_rb,queues):
    batch_size = 64
    n_warmup = 100
    n_training_step = int(1e+4)
    explorer_update_freq = 100

    model = MyModel()

    while global_rb.get_stored_size() < n_warmup:
        time.sleep(1)

    for step in tqdm(range(n_training_step)):
        sample = global_rb.sample(batch_size)

        model.train(sample)
        absTD = model.abs_TD_error(sample)
        global_rb.update_priorities(sample["indexes"],absTD)

        if step % explorer_update_freq == 0:
            w = model.weights
            for q in queues:
                q.put(w)


if __name__ == "__main__":
    buffer_size = int(1e+6)
    env_dict = {"obs": {"shape": 4},
                "act": {},
                "rew": {},
                "next_obs": {"shape": 4},
                "done": {}}
    n_explorer = 4

    global_rb = MPPrioritizedReplayBuffer(buffer_size,env_dict)

    is_training_done = Event()
    is_training_done.clear()

    qs = [SimpleQueue() for _ in range(n_explorer)]
    ps = [Process(target=explorer,
                  args=[global_rb,env_dict,is_training_done,q])
          for q in qs]

    for p in ps:
        p.start()

    learner(global_rb,qs)
    is_training_done.set()

    for p in ps:
        p.join()

    print(global_rb.get_stored_size())

  1. Updating segment tree for PER is critical section, too. To avoid data race, MPPrioritizedReplayBuffer lazily updates segment tree from learner process just before sample method. ↩︎